Elasticsearch-এ Data Processing Pipelines তৈরি করতে Ingest Pipelines ব্যবহার করা হয়। Ingest Pipelines হলো ডেটা প্রসেসিংয়ের জন্য ব্যবহৃত একটি ফিচার, যা Elasticsearch-এ ডেটা ইনডেক্স করার পূর্বে বিভিন্ন প্রসেসিং স্টেপ বা ট্রান্সফরমেশন প্রয়োগ করে। এটি মূলত ডেটা ক্লিনিং, এনরিচমেন্ট, ফরম্যাটিং, এবং অন্যান্য ট্রান্সফরমেশন করতে সাহায্য করে। Ingest Pipelines তৈরি করা এবং ব্যবহারের মাধ্যমে আপনি ডেটা ইনডেক্স করার আগে সেটি কাস্টমাইজ করতে পারবেন এবং বিভিন্ন প্রসেসিং লজিক প্রয়োগ করতে পারবেন।
Elasticsearch এ Ingest Pipeline হলো একটি স্টেপ-বাই-স্টেপ প্রসেসিং চেইন, যেখানে বিভিন্ন প্রসেসর (processors) ব্যবহার করা হয় ডেটা ট্রান্সফরমেশনের জন্য। প্রতিটি প্রসেসর একটি নির্দিষ্ট অপারেশন করে, যেমন একটি ফিল্ড অ্যাড করা, ফিল্ডের মান পরিবর্তন করা, ডেটা ফরম্যাট করা, ইত্যাদি।
Elasticsearch Node Configuration:
elasticsearch.yml
ফাইলে নিচের লাইনটি চেক করুন:node.ingest: true
Pipeline তৈরি করা:
PUT
রিকোয়েস্ট ব্যবহার করতে পারেন এবং বিভিন্ন প্রসেসর সংজ্ঞায়িত করতে পারেন। নিচে একটি সাধারণ উদাহরণ দেওয়া হলো যেখানে একটি পাইপলাইন তৈরি করা হয়েছে:PUT /_ingest/pipeline/my-pipeline
{
"description": "A simple pipeline for processing log data",
"processors": [
{
"set": {
"field": "ingested_at",
"value": "{{_ingest.timestamp}}"
}
},
{
"rename": {
"field": "message",
"target_field": "log_message"
}
},
{
"lowercase": {
"field": "log_level"
}
}
]
}
ingested_at
নামে একটি নতুন ফিল্ড তৈরি করছে এবং এতে ডেটা ইনজেস্ট করার সময় যোগ করছে।message
ফিল্ডের নাম পরিবর্তন করে log_message
করছে।log_level
ফিল্ডের মানকে ছোট অক্ষরে কনভার্ট করছে।Pipeline ব্যবহার করে ডেটা ইনডেক্স করা:
pipeline
প্যারামিটার ব্যবহার করে নির্দিষ্ট পাইপলাইন উল্লেখ করতে পারেন:POST /my-index/_doc?pipeline=my-pipeline
{
"message": "User logged in successfully",
"log_level": "INFO",
"user_id": 12345
}
my-index
ইনডেক্সে একটি ডকুমেন্ট ইনডেক্স করবে এবং my-pipeline
প্রসেসিং চেইন প্রয়োগ করবে। ডকুমেন্টটি ইনজেস্ট হওয়ার সময় পাইপলাইনের সকল প্রসেসর একের পর এক প্রয়োগ হবে।Ingest Pipeline-এ Elasticsearch বিভিন্ন ধরনের প্রসেসর সাপোর্ট করে। প্রতিটি প্রসেসর নির্দিষ্ট একটি অপারেশন করে। নিচে কিছু সাধারণ প্রসেসর এবং তাদের কাজের উদাহরণ দেওয়া হলো:
set: নতুন ফিল্ড তৈরি করা বা বিদ্যমান ফিল্ডের মান সেট করা।
{
"set": {
"field": "environment",
"value": "production"
}
}
rename: ফিল্ডের নাম পরিবর্তন করা।
{
"rename": {
"field": "original_name",
"target_field": "new_name"
}
}
remove: একটি ফিল্ড রিমুভ করা।
{
"remove": {
"field": "temp_field"
}
}
uppercase/lowercase: একটি ফিল্ডের মানকে বড় বা ছোট অক্ষরে রূপান্তর করা।
{
"uppercase": {
"field": "status"
}
}
grok: একটি ফিল্ডের ডেটা প্যাটার্ন দিয়ে এনালাইজ করে ভ্যালু এক্সট্র্যাক্ট করা (যেমন লগ মেসেজ পার্স করা)।
{
"grok": {
"field": "message",
"patterns": ["%{TIMESTAMP_ISO8601:timestamp} %{LOGLEVEL:log_level} %{GREEDYDATA:message}"]
}
}
Pipeline পুনরায় ব্যবহার: সাধারণ ট্রান্সফরমেশনগুলির জন্য একই Pipeline বারবার ব্যবহার করুন যাতে কোড পুনরায় লেখার প্রয়োজন না হয়।
Pipeline মডুলার রাখা: বিভিন্ন ধরণের ডেটার জন্য আলাদা পাইপলাইন তৈরি করুন, যাতে প্রতিটি পাইপলাইন নির্দিষ্ট কাজের জন্য অ্যাপটিমাইজড থাকে।
Pipeline টেস্ট করা: ডেটা ইনডেক্স করার আগে পাইপলাইন টেস্ট করা উচিত যাতে প্রসেসিংয়ে কোনো ভুল না হয়। Elasticsearch এ _simulate
API ব্যবহার করে পাইপলাইন টেস্ট করা যায়:
POST /_ingest/pipeline/my-pipeline/_simulate
{
"docs": [
{
"_source": {
"message": "User login failed",
"log_level": "ERROR"
}
}
]
}
এনরিচমেন্ট প্রসেসর ব্যবহার: enrich
প্রসেসরের মাধ্যমে ইনডেক্স করা ডেটার উপর ভিত্তি করে নতুন ফিল্ড যোগ করা যায়।
Elasticsearch এ Data Processing Pipelines (Ingest Pipelines) ব্যবহার করে আপনি ডেটাকে ইনডেক্স করার পূর্বে ক্লিন, ট্রান্সফর্ম এবং এনরিচ করতে পারেন। এটি Elasticsearch-এ ডেটা ম্যানেজমেন্ট এবং প্রসেসিংকে অনেক বেশি কার্যকর এবং অ্যাপটিমাইজড করে তোলে। Ingest Pipelines তৈরি এবং ব্যবহারের মাধ্যমে আপনার ডেটা প্রসেসিং চেইনকে আরো ফ্লেক্সিবল এবং কাস্টমাইজড করতে পারবেন।
Read more